#include <fcntl.h>
#include "HSHA.h"


HSHA::HSHA(int threads) :TcpServer(), threadPool_(threads)
{
	int r = pipe(wakeupFds_);
	fatalif(r, "pipe failed %d %s", errno, strerror(errno));
	r = util::addFdFlag(wakeupFds_[0], FD_CLOEXEC);
	fatalif(r, "addFdFlag failed %d %s", errno, strerror(errno));
	r = util::addFdFlag(wakeupFds_[1], FD_CLOEXEC);
	fatalif(r, "addFdFlag failed %d %s", errno, strerror(errno));
	trace("wakeup pipe created %d %d", wakeupFds_[0], wakeupFds_[1]);
	TcpConHandlerPtr handler(new TcpConHandler(wakeupFds_[0]));
	_handler = handler;	
	g_reactor.RegisterHandler(_handler, reactor::kReadEvent);
	_handler->OnRead([&](EventHandlerPtr evtHandler)
	{
		Task task;
		while (tasks_.pop_wait(&task, 0)) {
			task();
		}				
	}
	);
}


HSHA::~HSHA()
{
	if (_codec!=nullptr)
	{
		delete _codec;
		_codec = nullptr;
	}
	close(wakeupFds_[0]);
	close(wakeupFds_[1]);
}

void HSHA::OnMsg(CodecBase* codec, OnMsgCallBack cb)
{
	_codec = codec;
	OnClient([=](reactor::handle_t handle){
		TcpConHandlerPtr  handler(new TcpConHandler(handle));		
		handler->OnMsgCall(_codec->clone(), [=](EventHandlerPtr hler, const char* szByte, int len) {						
			g_reactor.RegisterHandler(_handler, reactor::kReadEvent);			
			std::string s(szByte, len);			
			threadPool_.addTask([=]{				
				_SafeCall([=] 
				{					
					cb(hler, s.c_str(), s.length()); 
					g_reactor.RegisterHandler(hler, reactor::kWriteEvent);
				}
				);
			});			
		});
		return handler;
	}
	);
}


void HSHA::_SafeCall(Task&& task)
{
	tasks_.push(move(task)); 
	_Wakeup(); 
}

void HSHA::_Wakeup() 
{
	int r = write(wakeupFds_[1], "1", 1);
	fatalif(r <= 0, "write error wd %d %d %s", r, errno, strerror(errno));
}
